package com.example.hotitemanalysis;

import com.example.bean.UserBehavior;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Slide;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.types.Row;

public class HotItemsWithSql {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

        DataStream<String> inputStream = env.readTextFile("E:\\Gitee\\UserBehaviorAnalysis\\HotItemAnalysis\\src\\main\\resources\\UserBehavior.csv");
        //DataStream<String> inputStream = env.socketTextStream("localhost", 9090);

        DataStream<UserBehavior> behaviorDataStream = inputStream.map(line -> {
            String[] fields = line.split(",");
            UserBehavior userBehavior = new UserBehavior();
            userBehavior.setUserId(new Long(fields[0]));
            userBehavior.setItemId(new Long(fields[1]));
            userBehavior.setCategoryId(new Integer(fields[2]));
            userBehavior.setBehavior(fields[3]);
            userBehavior.setTimestamp(new Long(fields[4]));
            return userBehavior;
        }).assignTimestampsAndWatermarks(new AscendingTimestampExtractor<UserBehavior>() {
            @Override
            public long extractAscendingTimestamp(UserBehavior userBehavior) {
                return userBehavior.getTimestamp() * 1000L;
            }
        });
        //behaviorDataStream.print();
        //创建表执行环境 用blink版本
        EnvironmentSettings settings = EnvironmentSettings
                .newInstance()
                .useBlinkPlanner()
                .inStreamingMode()
                .build();

        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings);

        //将流转换成表
        Table dataTable = tableEnv.fromDataStream(behaviorDataStream, "itemId,behavior,timestamp.rowtime as ts");

        //分组开窗
        //table api
        Table windowAggTable = dataTable
                .filter("behavior='pv'")
                .window(Slide.over("1.hours")
                        .every("5.minutes").on("ts").as("w")
                )
                .groupBy("itemId,w")
                .select("itemId,w.end as windowEnd, itemId.count as cnt");

        //利用开窗函数，对count值排序并且获取Row number , 得到TopN
        //SQL
        DataStream<Row> aggStream = tableEnv.toAppendStream(windowAggTable, Row.class);
        tableEnv.createTemporaryView("agg", aggStream, "itemId, windowEnd, cnt");

        Table resultTable = tableEnv.sqlQuery("select * from (select *, ROW_NUMBER() over (partition by windowEnd order by cnt desc) as row_num from agg ) where row_num <=5 ");

        //tableEnv.toAppendStream(resultTable, Row.class);
        tableEnv.toRetractStream(resultTable, Row.class).print();

        env.execute("job-hot items with sql");
    }
}
